﻿using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using StackExchange.Redis;
using System.Linq;
using Naruto.Redis.Internal;
using Naruto.Redis.Interface;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Naruto.Redis.Config;
using System.Threading;

namespace Naruto.Redis.Connection
{
    /// <summary>
    /// 张海波
    /// 2019.08.13
    /// redis缓存链接
    /// </summary>
    public partial class RedisConnection : IRedisConnection, IDisposable
    {
        private readonly ILogger logger;
        //private static readonly object locker = new object();

        private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1, 1);
        /// <summary>
        /// 获取redis的参数
        /// </summary>
        private readonly RedisOptions redisOptions;
        private RedisConnection(IOptions<RedisOptions> _options, ILogger _logger)
        {
            redisOptions = _options.Value;
            logger = _logger;
        }
        /// <summary>
        /// redis密码
        /// </summary>
        public string Password
        {
            get
            {
                return redisOptions != null ? redisOptions.Password : "";
            }
        }
        /// <summary>
        /// 默认访问存储库
        /// </summary>
        public int DataBase
        {
            get
            {
                return redisOptions != null ? redisOptions.DefaultDataBase : 0;
            }
        }
        /// <summary>
        /// redis连接字符串
        /// </summary>
        public string[] ConnectionEndPoint
        {
            get
            {
                return redisOptions != null ? redisOptions.Connection : new string[] { };
            }
        }

        /// <summary>
        /// 是否开启哨兵订阅模式 1 开启
        /// </summary>
        private bool IsSubscribeSentinel
        {
            get
            {
                return redisOptions != null ? redisOptions.IsSubscribeSentinel : false;
            }
        }
        /// <summary>
        /// 连接超时时间
        /// </summary>
        public int ConnectTimeout
        {
            get
            {
                return redisOptions != null ? redisOptions.ConnectTimeout : 300;
            }
        }

        /// <summary>
        /// 异步超时时间
        /// </summary>
        public int AsyncTimeout
        {
            get
            {
                return redisOptions != null ? redisOptions.AsyncTimeout : 5000;
            }
        }

        private ISubscriber sentinelsub;

        /// <summary>
        /// 获取
        /// </summary>
        public ConnectionMultiplexer Connection { get; private set; }


        /// <summary>
        /// redis的哨兵连接
        /// </summary>
        private ConnectionMultiplexer RedisSentinelConnection { get; set; }

    }

    public partial class RedisConnection
    {
        /// <summary>
        /// 创建连接
        /// </summary>
        /// <param name="_options"></param>
        /// <returns></returns>
        public static async Task<RedisConnection> CreateAsync(IOptions<RedisOptions> _options, ILogger logger)
        {
            var connectionHelper = new RedisConnection(_options, logger);
            //创建连接
            await connectionHelper.CreateConnectionAsync();
            return connectionHelper;
        }
        /// <summary>
        /// 创建连接
        /// </summary>
        /// <param name="_options"></param>
        /// <returns></returns>
        public static RedisConnection Create(IOptions<RedisOptions> _options, ILogger logger)
        {
            var connectionHelper = new RedisConnection(_options, logger);
            //创建连接
            connectionHelper.CreateConnection();
            return connectionHelper;
        }
        /// <summary>
        /// 获取
        /// </summary>
        /// <returns></returns>

        private void CreateConnection()
        {
            if (Connection != null && Connection.IsConnected)
            {
                return;
            }
            if (ConnectionEndPoint == null || ConnectionEndPoint.Count() <= 0)
            {
                throw new ArgumentNullException("请填写有效的redis连接字符串,Connection");
            }
            try
            {
                connectionLock.Wait();

                if (Connection != null && Connection.IsConnected)
                {
                    return;
                }
                ConfigurationOptions configurationOptions = new ConfigurationOptions()
                {
                    AllowAdmin = true,
                    Password = Password,
                    DefaultDatabase = DataBase,
                    ConnectTimeout = ConnectTimeout,
                    AbortOnConnectFail = false,
                    AsyncTimeout = AsyncTimeout
                };
                //获取连接的字符串
                var connections = ConnectionEndPoint.ToList();
                connections.ForEach(item =>
                {
                    if (!string.IsNullOrWhiteSpace(item))
                    {
                        configurationOptions.EndPoints.Add(item);
                    }
                });
                Connection = ConnectionMultiplexer.Connect(configurationOptions);

                //注册如下事件
                Connection.ConnectionFailed += MuxerConnectionFailed;
                Connection.ConnectionRestored += MuxerConnectionRestored;
                Connection.ErrorMessage += MuxerErrorMessage;
                Connection.ConfigurationChanged += MuxerConfigurationChanged;
                Connection.HashSlotMoved += MuxerHashSlotMoved;
                Connection.InternalError += MuxerInternalError;
                //是否订阅哨兵通知消息
                if (IsSubscribeSentinel)
                {
                    SubscribeSentinel();
                }
            }
            catch (Exception)
            {
                throw;
            }
            finally
            {
                connectionLock.Release();
            }
        }

        /// <summary>
        /// 创建连接
        /// </summary>
        /// <returns></returns>

        private async Task CreateConnectionAsync()
        {
            if (Connection != null && Connection.IsConnected)
            {
                return;
            }
            if (ConnectionEndPoint == null || ConnectionEndPoint.Count() <= 0)
            {
                throw new ArgumentNullException("redis链接字符串为null");
            }
            try
            {
                await connectionLock.WaitAsync().ConfigureAwait(false);

                if (Connection != null && Connection.IsConnected)
                {
                    return;
                }
                ConfigurationOptions configurationOptions = new ConfigurationOptions()
                {
                    AllowAdmin = true,
                    Password = Password,
                    DefaultDatabase = DataBase,
                    ConnectTimeout = ConnectTimeout,
                    AbortOnConnectFail = false,
                    AsyncTimeout = AsyncTimeout
                };

                //获取连接的字符串
                var connections = ConnectionEndPoint.ToList();
                connections.ForEach(item =>
                {
                    if (!string.IsNullOrWhiteSpace(item))
                    {
                        configurationOptions.EndPoints.Add(item);
                    }
                });
                Connection = await ConnectionMultiplexer.ConnectAsync(configurationOptions).ConfigureAwait(false);

                //注册如下事件
                Connection.ConnectionFailed += MuxerConnectionFailed;
                Connection.ConnectionRestored += MuxerConnectionRestored;
                Connection.ErrorMessage += MuxerErrorMessage;
                Connection.ConfigurationChanged += MuxerConfigurationChanged;
                Connection.HashSlotMoved += MuxerHashSlotMoved;
                Connection.InternalError += MuxerInternalError;
                //是否订阅哨兵通知消息
                if (IsSubscribeSentinel)
                {
                    await SubscribeSentinelAsync().ConfigureAwait(false);
                }
            }
            catch (Exception)
            {
                throw;
            }
            finally
            {
                connectionLock.Release();
            }
        }

        #region 哨兵集群
        public static ConfigurationOptions sentineloption = new ConfigurationOptions()
        {
            TieBreaker = "",
            CommandMap = CommandMap.Sentinel,
            ServiceName = "mymaster",
            //ReconnectRetryPolicy=new ExponentialRetry
        };
        /// <summary>
        /// 订阅哨兵主从切换
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        private void SubscribeSentinel(ConfigurationOptions sentineloptions = null)
        {
            if (RedisSentinelConnection != null && RedisSentinelConnection.IsConnected)
            {
                return;
            }
            //获取哨兵地址
            List<string> sentinelConfig = redisOptions.RedisSentinelIp.ToList() ?? new List<string>();
            //哨兵节点
            sentinelConfig.ForEach(a =>
            {
                var endPoint = RedisBase.ParseEndPoints(a);
                if (!sentineloption.EndPoints.Contains(endPoint))
                {
                    sentineloption.EndPoints.Add(a);
                }
            });
            sentineloptions = sentineloptions ?? sentineloption;
            //我们可以成功的连接一个sentinel服务器，对这个连接的实际意义在于：当一个主从进行切换后，如果它外层有Twemproxy代理，我们可以在这个时机（+switch-master事件)通知你的Twemproxy代理服务器，并更新它的配置文件里的master服务器的地址，然后从起你的Twemproxy服务，这样你的主从切换才算真正完成。
            //一般没有代理服务器，直接更改从数据库配置文件，将其升级为主数据库。
            RedisSentinelConnection = ConnectionMultiplexer.Connect(sentineloptions);
            sentinelsub = RedisSentinelConnection.GetSubscriber();

            sentinelsub.Subscribe("+switch-master", (channel, message) =>
            {
                //当redis主从切换，可在此更改redis的 项目配置中的redis主从信息
                logger.LogInformation("监听到redis主从切换,{message}", message);
            });
        }

        /// <summary>
        /// 订阅哨兵主从切换
        /// </summary>
        /// <param name="options"></param>
        /// <returns></returns>
        private async Task SubscribeSentinelAsync(ConfigurationOptions sentineloptions = null)
        {
            if (RedisSentinelConnection != null && RedisSentinelConnection.IsConnected)
            {
                return;
            }
            //获取哨兵地址
            List<string> sentinelConfig = redisOptions.RedisSentinelIp.ToList() ?? new List<string>();
            //哨兵节点
            sentinelConfig.ForEach(a =>
            {
                var endPoint = RedisBase.ParseEndPoints(a);
                if (!sentineloption.EndPoints.Contains(endPoint))
                {
                    sentineloption.EndPoints.Add(a);
                }
            });
            sentineloptions = sentineloptions ?? sentineloption;
            //我们可以成功的连接一个sentinel服务器，对这个连接的实际意义在于：当一个主从进行切换后，如果它外层有Twemproxy代理，我们可以在这个时机（+switch-master事件)通知你的Twemproxy代理服务器，并更新它的配置文件里的master服务器的地址，然后从起你的Twemproxy服务，这样你的主从切换才算真正完成。
            //一般没有代理服务器，直接更改从数据库配置文件，将其升级为主数据库。
            RedisSentinelConnection = await ConnectionMultiplexer.ConnectAsync(sentineloptions).ConfigureAwait(false);
            sentinelsub = RedisSentinelConnection.GetSubscriber();
            //RedisSentinelConnection.GetSentinelMasterConnection(sentineloption);
            //订阅redis主从切换的事件
            sentinelsub.Subscribe("+switch-master", (channel, message) =>
            {
                //当redis主从切换，可在此更改redis的 项目配置中的redis主从信息
                logger.LogInformation("监听到redis主从切换,{message}", message);
            });
        }

        #endregion
        #region 事件

        /// <summary>
        /// 配置更改时
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerConfigurationChanged(object sender, EndPointEventArgs e)
        {
            logger.LogInformation("Configuration changed: " + e.EndPoint);
        }

        /// <summary>
        /// 发生错误时
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerErrorMessage(object sender, RedisErrorEventArgs e)
        {
            logger.LogError("ErrorMessage: " + e.Message);
        }

        /// <summary>
        /// 重新建立连接之前的错误
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerConnectionRestored(object sender, ConnectionFailedEventArgs e)
        {
            logger.LogError("ConnectionRestored: " + e.EndPoint);
        }

        /// <summary>
        /// 连接失败 ， 如果重新连接成功你将不会收到这个通知
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerConnectionFailed(object sender, ConnectionFailedEventArgs e)
        {
            logger.LogError("重新连接：Endpoint failed: " + e.EndPoint + ", " + e.FailureType + (e.Exception == null ? "" : (", " + e.Exception.Message)));
        }

        /// <summary>
        /// 更改集群
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerHashSlotMoved(object sender, HashSlotMovedEventArgs e)
        {
            logger.LogInformation("HashSlotMoved:NewEndPoint" + e.NewEndPoint + ", OldEndPoint" + e.OldEndPoint);
        }

        /// <summary>
        /// redis类库错误
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void MuxerInternalError(object sender, InternalErrorEventArgs e)
        {
            logger.LogError("InternalError:Message" + e.Exception.Message);
        }
        #endregion 事件

        public void Dispose()
        {
            Connection?.Close();
            Connection?.Dispose();
            RedisSentinelConnection?.Close();
            RedisSentinelConnection?.Dispose();
        }
    }
}
